package com.jourwon.spring.boot.controller;

import com.jourwon.spring.boot.model.KafkaConsumerInfo;
import com.jourwon.spring.boot.service.KafkaConsumerRegistryService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;
import java.util.List;

/**
 * kafka消费者管理接口
 *
 * @author JourWon
 * @date 2022/3/22
 */
@Slf4j
@RestController
@Api(tags = "kafka消费者管理接口")
@RequestMapping(path = "/api/kafka/registry")
public class KafkaConsumerRegistryController {

    @Resource
    private KafkaConsumerRegistryService kafkaConsumerRegistryService;

    @GetMapping
    @ApiOperation("获取kafka消费者列表")
    public List<KafkaConsumerInfo> listConsumerId() {
        return kafkaConsumerRegistryService.listConsumerId();
    }

    @PostMapping(path = "/register")
    @ApiOperation("注册kafka消费者")
    public String registeConsumer(@RequestParam String topic, @RequestParam String listenerClass, @RequestParam boolean startImmediately) {
        return kafkaConsumerRegistryService.registeConsumer(topic, listenerClass, startImmediately);
    }

    @PostMapping(path = "/start")
    @ApiOperation("启用kafka消费者")
    public void startConsumer(@RequestParam String consumerId) {
        kafkaConsumerRegistryService.startConsumer(consumerId);
    }

    @PostMapping(path = "/pause")
    @ApiOperation("暂停kafka消费者")
    public void pauseConsumer(@RequestParam String consumerId) {
        kafkaConsumerRegistryService.pauseConsumer(consumerId);
    }

    @PostMapping(path = "/resume")
    @ApiOperation("恢复kafka消费者")
    public void resumeConsumer(@RequestParam String consumerId) {
        kafkaConsumerRegistryService.resumeConsumer(consumerId);
    }

    @PostMapping(path = "/stop")
    @ApiOperation("停用kafka消费者")
    public void stopConsumer(@RequestParam String consumerId) {
        kafkaConsumerRegistryService.stopConsumer(consumerId);
    }

}
